View Javadoc
1   package org.apache.maven.surefire.junitcore.pc;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.lang.annotation.Annotation;
23  import java.util.ArrayList;
24  import java.util.Arrays;
25  import java.util.Collection;
26  import java.util.Collections;
27  import java.util.EnumMap;
28  import java.util.Iterator;
29  import java.util.LinkedHashSet;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.Set;
33  import java.util.concurrent.ExecutorService;
34  import java.util.concurrent.Executors;
35  import java.util.concurrent.ThreadFactory;
36  
37  import org.apache.maven.surefire.junitcore.JUnitCoreParameters;
38  import org.apache.maven.surefire.report.ConsoleStream;
39  import org.apache.maven.surefire.testset.TestSetFailedException;
40  import org.apache.maven.surefire.util.internal.DaemonThreadFactory;
41  import org.junit.internal.runners.ErrorReportingRunner;
42  import org.junit.runner.Description;
43  import org.junit.runner.Runner;
44  import org.junit.runner.manipulation.Filter;
45  import org.junit.runner.manipulation.NoTestsRemainException;
46  import org.junit.runner.notification.RunNotifier;
47  import org.junit.runners.ParentRunner;
48  import org.junit.runners.Suite;
49  import org.junit.runners.model.InitializationError;
50  import org.junit.runners.model.RunnerBuilder;
51  
52  import static org.apache.maven.surefire.junitcore.pc.ParallelComputerUtil.resolveConcurrency;
53  import static org.apache.maven.surefire.junitcore.pc.SchedulingStrategies.createParallelStrategy;
54  import static org.apache.maven.surefire.junitcore.pc.SchedulingStrategies.createParallelStrategyUnbounded;
55  import static org.apache.maven.surefire.junitcore.pc.Type.CLASSES;
56  import static org.apache.maven.surefire.junitcore.pc.Type.METHODS;
57  import static org.apache.maven.surefire.junitcore.pc.Type.SUITES;
58  
59  /**
60   * Executing suites, classes and methods with defined concurrency. In this example the threads which completed
61   * the suites and classes can be reused in parallel methods.
62   * <pre>
63   * JUnitCoreParameters parameters = ...;
64   * ParallelComputerBuilder builder = new ParallelComputerBuilder(parameters);
65   * builder.useOnePool(8).parallelSuites(2).parallelClasses(4).parallelMethods();
66   * ParallelComputerBuilder.ParallelComputer computer = builder.buildComputer();
67   * Class<?>[] tests = {...};
68   * new JUnitCore().run(computer, tests);
69   * </pre>
70   * Note that the type has always at least one thread even if unspecified. The capacity in
71   * {@link ParallelComputerBuilder#useOnePool(int)} must be greater than the number of concurrent suites and classes
72   * altogether.
73   * <p/>
74   * The Computer can be stopped in a separate thread. Pending tests will be interrupted if the argument is
75   * <tt>true</tt>.
76   * <pre>
77   * computer.describeStopped(true);
78   * </pre>
79   *
80   * @author Tibor Digana (tibor17)
81   * @since 2.16
82   */
83  public final class ParallelComputerBuilder
84  {
85      private static final ThreadFactory DAEMON_THREAD_FACTORY = DaemonThreadFactory.newDaemonThreadFactory();
86  
87      private static final Class<? extends Annotation> JCIP_NOT_THREAD_SAFE = loadNotThreadSafeAnnotations();
88  
89      private static final Set<?> NULL_SINGLETON = Collections.singleton( null );
90  
91      static final int TOTAL_POOL_SIZE_UNDEFINED = 0;
92  
93      private final Map<Type, Integer> parallelGroups = new EnumMap<Type, Integer>( Type.class );
94  
95      private final ConsoleStream logger;
96  
97      private boolean useSeparatePools;
98  
99      private int totalPoolSize;
100 
101     private JUnitCoreParameters parameters;
102 
103     private boolean optimize;
104 
105     private boolean runningInTests;
106 
107     /**
108      * Calling {@link #useSeparatePools()}.
109      * Can be used only in unit tests.
110      * Do NOT call this constructor in production.
111      */
112     ParallelComputerBuilder( ConsoleStream logger )
113     {
114         this.logger = logger;
115         runningInTests = true;
116         useSeparatePools();
117         parallelGroups.put( SUITES, 0 );
118         parallelGroups.put( CLASSES, 0 );
119         parallelGroups.put( METHODS, 0 );
120     }
121 
122     public ParallelComputerBuilder( ConsoleStream logger, JUnitCoreParameters parameters )
123     {
124         this( logger );
125         runningInTests = false;
126         this.parameters = parameters;
127     }
128 
129     public ParallelComputer buildComputer()
130     {
131         return new PC();
132     }
133 
134     ParallelComputerBuilder useSeparatePools()
135     {
136         totalPoolSize = TOTAL_POOL_SIZE_UNDEFINED;
137         useSeparatePools = true;
138         return this;
139     }
140 
141     ParallelComputerBuilder useOnePool()
142     {
143         totalPoolSize = TOTAL_POOL_SIZE_UNDEFINED;
144         useSeparatePools = false;
145         return this;
146     }
147 
148     /**
149      * @param totalPoolSize Pool size where suites, classes and methods are executed in parallel.
150      *                      If the <tt>totalPoolSize</tt> is {@link Integer#MAX_VALUE}, the pool capacity is not
151      *                      limited.
152      * @throws IllegalArgumentException If <tt>totalPoolSize</tt> is &lt; 1.
153      */
154     ParallelComputerBuilder useOnePool( int totalPoolSize )
155     {
156         if ( totalPoolSize < 1 )
157         {
158             throw new IllegalArgumentException( "Size of common pool is less than 1." );
159         }
160         this.totalPoolSize = totalPoolSize;
161         useSeparatePools = false;
162         return this;
163     }
164 
165     boolean isOptimized()
166     {
167         return optimize;
168     }
169 
170     ParallelComputerBuilder optimize( boolean optimize )
171     {
172         this.optimize = optimize;
173         return this;
174     }
175 
176     ParallelComputerBuilder parallelSuites()
177     {
178         return parallel( SUITES );
179     }
180 
181     ParallelComputerBuilder parallelSuites( int nThreads )
182     {
183         return parallel( nThreads, SUITES );
184     }
185 
186     ParallelComputerBuilder parallelClasses()
187     {
188         return parallel( CLASSES );
189     }
190 
191     ParallelComputerBuilder parallelClasses( int nThreads )
192     {
193         return parallel( nThreads, CLASSES );
194     }
195 
196     ParallelComputerBuilder parallelMethods()
197     {
198         return parallel( METHODS );
199     }
200 
201     ParallelComputerBuilder parallelMethods( int nThreads )
202     {
203         return parallel( nThreads, METHODS );
204     }
205 
206     private ParallelComputerBuilder parallel( int nThreads, Type parallelType )
207     {
208         if ( nThreads < 0 )
209         {
210             throw new IllegalArgumentException( "negative nThreads " + nThreads );
211         }
212 
213         if ( parallelType == null )
214         {
215             throw new IllegalArgumentException( "null parallelType" );
216         }
217 
218         parallelGroups.put( parallelType, nThreads );
219         return this;
220     }
221 
222     private ParallelComputerBuilder parallel( Type parallelType )
223     {
224         return parallel( Integer.MAX_VALUE, parallelType );
225     }
226 
227     private double parallelTestsTimeoutInSeconds()
228     {
229         return parameters == null ? 0d : parameters.getParallelTestsTimeoutInSeconds();
230     }
231 
232     private double parallelTestsTimeoutForcedInSeconds()
233     {
234         return parameters == null ? 0d : parameters.getParallelTestsTimeoutForcedInSeconds();
235     }
236 
237     @SuppressWarnings( "unchecked" )
238     private static Class<? extends Annotation> loadNotThreadSafeAnnotations()
239     {
240         try
241         {
242             Class c = Class.forName( "net.jcip.annotations.NotThreadSafe" );
243             return c.isAnnotation() ? (Class<? extends Annotation>) c : null;
244         }
245         catch ( ClassNotFoundException e )
246         {
247             return null;
248         }
249     }
250 
251     final class PC
252         extends ParallelComputer
253     {
254         private final SingleThreadScheduler notThreadSafeTests =
255             new SingleThreadScheduler( ParallelComputerBuilder.this.logger );
256 
257         private final Collection<ParentRunner> suites = new LinkedHashSet<ParentRunner>();
258 
259         private final Collection<ParentRunner> nestedSuites = new LinkedHashSet<ParentRunner>();
260 
261         private final Collection<ParentRunner> classes = new LinkedHashSet<ParentRunner>();
262 
263         private final Collection<ParentRunner> nestedClasses = new LinkedHashSet<ParentRunner>();
264 
265         private final Collection<Runner> notParallelRunners = new LinkedHashSet<Runner>();
266 
267         private int poolCapacity;
268 
269         private boolean splitPool;
270 
271         private final Map<Type, Integer> allGroups;
272 
273         private long nestedClassesChildren;
274 
275         private volatile Scheduler master;
276 
277         private PC()
278         {
279             super( parallelTestsTimeoutInSeconds(), parallelTestsTimeoutForcedInSeconds() );
280             allGroups = new EnumMap<Type, Integer>( ParallelComputerBuilder.this.parallelGroups );
281             poolCapacity = ParallelComputerBuilder.this.totalPoolSize;
282             splitPool = ParallelComputerBuilder.this.useSeparatePools;
283         }
284 
285         Collection<ParentRunner> getSuites()
286         {
287             return suites;
288         }
289 
290         Collection<ParentRunner> getNestedSuites()
291         {
292             return nestedSuites;
293         }
294 
295         Collection<ParentRunner> getClasses()
296         {
297             return classes;
298         }
299 
300         Collection<ParentRunner> getNestedClasses()
301         {
302             return nestedClasses;
303         }
304 
305         Collection<Runner> getNotParallelRunners()
306         {
307             return notParallelRunners;
308         }
309 
310         int getPoolCapacity()
311         {
312             return poolCapacity;
313         }
314 
315         boolean isSplitPool()
316         {
317             return splitPool;
318         }
319 
320         @Override
321         protected ShutdownResult describeStopped( boolean shutdownNow )
322         {
323             ShutdownResult shutdownResult = notThreadSafeTests.describeStopped( shutdownNow );
324             final Scheduler m = master;
325             if ( m != null )
326             {
327                 ShutdownResult shutdownResultOfMaster = m.describeStopped( shutdownNow );
328                 shutdownResult.getTriggeredTests().addAll( shutdownResultOfMaster.getTriggeredTests() );
329                 shutdownResult.getIncompleteTests().addAll( shutdownResultOfMaster.getIncompleteTests() );
330             }
331             return shutdownResult;
332         }
333 
334         @Override
335         boolean shutdownThreadPoolsAwaitingKilled()
336         {
337             boolean notInterrupted = notThreadSafeTests.shutdownThreadPoolsAwaitingKilled();
338             final Scheduler m = master;
339             if ( m != null )
340             {
341                 notInterrupted &= m.shutdownThreadPoolsAwaitingKilled();
342             }
343             return notInterrupted;
344         }
345 
346         @Override
347         public Runner getSuite( RunnerBuilder builder, Class<?>[] cls )
348             throws InitializationError
349         {
350             try
351             {
352                 super.getSuite( builder, cls );
353                 populateChildrenFromSuites();
354 
355                 WrappedRunners suiteSuites = wrapRunners( suites );
356                 WrappedRunners suiteClasses = wrapRunners( classes );
357 
358                 long suitesCount = suites.size();
359                 long classesCount = classes.size() + nestedClasses.size();
360                 long methodsCount = suiteClasses.embeddedChildrenCount + nestedClassesChildren;
361                 if ( !ParallelComputerBuilder.this.runningInTests )
362                 {
363                     determineThreadCounts( suitesCount, classesCount, methodsCount );
364                 }
365 
366                 return setSchedulers( suiteSuites.wrappingSuite, suiteClasses.wrappingSuite );
367             }
368             catch ( TestSetFailedException e )
369             {
370                 throw new InitializationError( Collections.<Throwable>singletonList( e ) );
371             }
372         }
373 
374         @Override
375         protected Runner getRunner( RunnerBuilder builder, Class<?> testClass )
376             throws Throwable
377         {
378             Runner runner = super.getRunner( builder, testClass );
379             if ( canSchedule( runner ) )
380             {
381                 if ( !isThreadSafe( runner ) )
382                 {
383                     ( ( ParentRunner ) runner ).setScheduler( notThreadSafeTests.newRunnerScheduler() );
384                     notParallelRunners.add( runner );
385                 }
386                 else if ( runner instanceof Suite )
387                 {
388                     suites.add( (Suite) runner );
389                 }
390                 else
391                 {
392                     classes.add( (ParentRunner) runner );
393                 }
394             }
395             else
396             {
397                 notParallelRunners.add( runner );
398             }
399             return runner;
400         }
401 
402         private void determineThreadCounts( long suites, long classes, long methods )
403             throws TestSetFailedException
404         {
405             RunnerCounter counts = null;
406             if ( ParallelComputerBuilder.this.optimize )
407             {
408                 counts = new RunnerCounter( suites, classes, methods );
409             }
410             Concurrency concurrency =
411                     resolveConcurrency( ParallelComputerBuilder.this.parameters, counts );
412             allGroups.put( SUITES, concurrency.suites );
413             allGroups.put( CLASSES, concurrency.classes );
414             allGroups.put( METHODS, concurrency.methods );
415             poolCapacity = concurrency.capacity;
416             splitPool &= concurrency.capacity <= 0; // fault if negative; should not happen
417         }
418 
419         private <T extends Runner> WrappedRunners wrapRunners( Collection<T> runners )
420             throws InitializationError
421         {
422             // Do NOT use allGroups here.
423             long childrenCounter = 0;
424             ArrayList<Runner> runs = new ArrayList<Runner>();
425             for ( T runner : runners )
426             {
427                 if ( runner != null )
428                 {
429                     int children = countChildren( runner );
430                     childrenCounter += children;
431                     if ( children != 0 )
432                     {
433                         runs.add( runner );
434                     }
435                 }
436             }
437             return runs.isEmpty() ? new WrappedRunners() : new WrappedRunners( createSuite( runs ), childrenCounter );
438         }
439 
440         private int countChildren( Runner runner )
441         {
442             Description description = runner.getDescription();
443             Collection children = description == null ? null : description.getChildren();
444             return children == null ? 0 : children.size();
445         }
446 
447         private ExecutorService createPool( int poolSize )
448         {
449             return poolSize < Integer.MAX_VALUE
450                 ? Executors.newFixedThreadPool( poolSize, DAEMON_THREAD_FACTORY )
451                 : Executors.newCachedThreadPool( DAEMON_THREAD_FACTORY );
452         }
453 
454         private Scheduler createMaster( ExecutorService pool, int poolSize )
455         {
456             // can be 0, 1, 2 or 3
457             final int finalRunnersCounter = countFinalRunners();
458 
459             final SchedulingStrategy strategy;
460             if ( finalRunnersCounter <= 1 || poolSize <= 1 )
461             {
462                 strategy = new InvokerStrategy( ParallelComputerBuilder.this.logger );
463             }
464             else if ( pool != null && poolSize == Integer.MAX_VALUE )
465             {
466                 strategy = new SharedThreadPoolStrategy( ParallelComputerBuilder.this.logger, pool );
467             }
468             else
469             {
470                 strategy = createParallelStrategy( ParallelComputerBuilder.this.logger, finalRunnersCounter );
471             }
472             return new Scheduler( ParallelComputerBuilder.this.logger, null, strategy );
473         }
474 
475         private int countFinalRunners()
476         {
477             int counter = notParallelRunners.isEmpty() ? 0 : 1;
478 
479             if ( !suites.isEmpty() && allGroups.get( SUITES ) > 0 )
480             {
481                 ++counter;
482             }
483 
484             if ( !classes.isEmpty() && allGroups.get( CLASSES ) > 0 )
485             {
486                 ++counter;
487             }
488 
489             return counter;
490         }
491 
492         private void populateChildrenFromSuites()
493         {
494             // Do NOT use allGroups here.
495             Filter filter = new SuiteFilter();
496             for ( Iterator<ParentRunner> it = suites.iterator(); it.hasNext(); )
497             {
498                 ParentRunner suite = it.next();
499                 try
500                 {
501                     suite.filter( filter );
502                 }
503                 catch ( NoTestsRemainException e )
504                 {
505                     it.remove();
506                 }
507             }
508         }
509 
510         private int totalPoolSize()
511         {
512             if ( poolCapacity == TOTAL_POOL_SIZE_UNDEFINED )
513             {
514                 int total = 0;
515                 for ( int nThreads : allGroups.values() )
516                 {
517                     total += nThreads;
518                     if ( total < 0 )
519                     {
520                         total = Integer.MAX_VALUE;
521                         break;
522                     }
523                 }
524                 return total;
525             }
526             else
527             {
528                 return poolCapacity;
529             }
530         }
531 
532         private Runner setSchedulers( ParentRunner suiteSuites, ParentRunner suiteClasses )
533             throws InitializationError
534         {
535             int parallelSuites = allGroups.get( SUITES );
536             int parallelClasses = allGroups.get( CLASSES );
537             int parallelMethods = allGroups.get( METHODS );
538             int poolSize = totalPoolSize();
539             ExecutorService commonPool = splitPool || poolSize == 0 ? null : createPool( poolSize );
540             master = createMaster( commonPool, poolSize );
541 
542             if ( suiteSuites != null )
543             {
544                 // a scheduler for parallel suites
545                 if ( commonPool != null && parallelSuites > 0 )
546                 {
547                     Balancer balancer = BalancerFactory.createBalancerWithFairness( parallelSuites );
548                     suiteSuites.setScheduler( createScheduler( null, commonPool, true, balancer ) );
549                 }
550                 else
551                 {
552                     suiteSuites.setScheduler( createScheduler( parallelSuites ) );
553                 }
554             }
555 
556             // schedulers for parallel classes
557             ArrayList<ParentRunner> allSuites = new ArrayList<ParentRunner>( suites );
558             allSuites.addAll( nestedSuites );
559             if ( suiteClasses != null )
560             {
561                 allSuites.add( suiteClasses );
562             }
563             if ( !allSuites.isEmpty() )
564             {
565                 setSchedulers( allSuites, parallelClasses, commonPool );
566             }
567 
568             // schedulers for parallel methods
569             ArrayList<ParentRunner> allClasses = new ArrayList<ParentRunner>( classes );
570             allClasses.addAll( nestedClasses );
571             if ( !allClasses.isEmpty() )
572             {
573                 setSchedulers( allClasses, parallelMethods, commonPool );
574             }
575 
576             // resulting runner for Computer#getSuite() scheduled by master scheduler
577             ParentRunner all = createFinalRunner( removeNullRunners(
578                 Arrays.<Runner>asList( suiteSuites, suiteClasses, createSuite( notParallelRunners ) )
579             ) );
580             all.setScheduler( master );
581             return all;
582         }
583 
584         private ParentRunner createFinalRunner( List<Runner> runners )
585             throws InitializationError
586         {
587             return new Suite( null, runners )
588             {
589                 @Override
590                 public void run( RunNotifier notifier )
591                 {
592                     try
593                     {
594                         beforeRunQuietly();
595                         super.run( notifier );
596                     }
597                     finally
598                     {
599                         afterRunQuietly();
600                     }
601                 }
602             };
603         }
604 
605         private void setSchedulers( Iterable<? extends ParentRunner> runners, int poolSize, ExecutorService commonPool )
606         {
607             if ( commonPool != null )
608             {
609                 Balancer concurrencyLimit = BalancerFactory.createBalancerWithFairness( poolSize );
610                 boolean doParallel = poolSize > 0;
611                 for ( ParentRunner runner : runners )
612                 {
613                     runner.setScheduler(
614                         createScheduler( runner.getDescription(), commonPool, doParallel, concurrencyLimit ) );
615                 }
616             }
617             else
618             {
619                 ExecutorService pool = null;
620                 if ( poolSize == Integer.MAX_VALUE )
621                 {
622                     pool = Executors.newCachedThreadPool( DAEMON_THREAD_FACTORY );
623                 }
624                 else if ( poolSize > 0 )
625                 {
626                     pool = Executors.newFixedThreadPool( poolSize, DAEMON_THREAD_FACTORY );
627                 }
628                 boolean doParallel = pool != null;
629                 for ( ParentRunner runner : runners )
630                 {
631                     runner.setScheduler( createScheduler( runner.getDescription(), pool, doParallel,
632                                                           BalancerFactory.createInfinitePermitsBalancer() ) );
633                 }
634             }
635         }
636 
637         private Scheduler createScheduler( Description desc, ExecutorService pool, boolean doParallel,
638                                            Balancer concurrency )
639         {
640             SchedulingStrategy strategy =
641                     doParallel & pool != null
642                     ? new SharedThreadPoolStrategy( ParallelComputerBuilder.this.logger, pool )
643                     : new InvokerStrategy( ParallelComputerBuilder.this.logger );
644             return new Scheduler( ParallelComputerBuilder.this.logger, desc, master, strategy, concurrency );
645         }
646 
647         private Scheduler createScheduler( int poolSize )
648         {
649             final SchedulingStrategy strategy;
650             if ( poolSize == Integer.MAX_VALUE )
651             {
652                 strategy = createParallelStrategyUnbounded( ParallelComputerBuilder.this.logger );
653             }
654             else if ( poolSize == 0 )
655             {
656                 strategy = new InvokerStrategy( ParallelComputerBuilder.this.logger );
657             }
658             else
659             {
660                 strategy = createParallelStrategy( ParallelComputerBuilder.this.logger, poolSize );
661             }
662             return new Scheduler( ParallelComputerBuilder.this.logger, null, master, strategy );
663         }
664 
665         private boolean canSchedule( Runner runner )
666         {
667             return !( runner instanceof ErrorReportingRunner ) && runner instanceof ParentRunner;
668         }
669 
670         private boolean isThreadSafe( Runner runner )
671         {
672             return runner.getDescription().getAnnotation( JCIP_NOT_THREAD_SAFE ) == null;
673         }
674 
675         private class SuiteFilter
676             extends Filter
677         {
678             // Do NOT use allGroups in SuiteFilter.
679 
680             @Override
681             public boolean shouldRun( Description description )
682             {
683                 return true;
684             }
685 
686             @Override
687             public void apply( Object child )
688                 throws NoTestsRemainException
689             {
690                 super.apply( child );
691                 if ( child instanceof ParentRunner )
692                 {
693                     ParentRunner runner = ( ParentRunner ) child;
694                     if ( !isThreadSafe( runner ) )
695                     {
696                         runner.setScheduler( notThreadSafeTests.newRunnerScheduler() );
697                     }
698                     else if ( child instanceof Suite )
699                     {
700                         nestedSuites.add( (Suite) child );
701                     }
702                     else
703                     {
704                         ParentRunner parentRunner = (ParentRunner) child;
705                         nestedClasses.add( parentRunner );
706                         nestedClassesChildren += parentRunner.getDescription().getChildren().size();
707                     }
708                 }
709             }
710 
711             @Override
712             public String describe()
713             {
714                 return "";
715             }
716         }
717     }
718 
719     private static Suite createSuite( Collection<Runner> runners )
720         throws InitializationError
721     {
722         final List<Runner> onlyRunners = removeNullRunners( runners );
723         return onlyRunners.isEmpty() ? null : new Suite( null, onlyRunners )
724         {
725         };
726     }
727 
728     private static List<Runner> removeNullRunners( Collection<Runner> runners )
729     {
730         final List<Runner> onlyRunners = new ArrayList<Runner>( runners );
731         onlyRunners.removeAll( NULL_SINGLETON );
732         return onlyRunners;
733     }
734 }